跳到主要内容

Java 并发编程 CAS 乐观锁

CAS 的概念

参考资料 深入浅出Java多线程 CAS与原子操作 参考资料 不可不说的Java“锁”事

注意:CAS 是通过硬件(CPU 指令)命令保证了原子性的,而下面的过程都是基于 CAS 是原子操作这一重要原则

CAS 全称 Compare And Swap(比较与交换),是一种 无锁算法。在不使用锁(没有线程被阻塞)的情况下实现多线程之间的变量同步。(注意,这玩意不是锁,只不过它经常和自旋锁一起出现)

CAS 算法实现了区别于 synchronized 同步锁的一种乐观锁。JDK5 之前 Java 语言是靠 synchronized 关键字保证同步的,这是一种独占锁,也是悲观锁

在 CAS 中,有这样三个值

  • V:要更新的变量(var)
  • E:预期值(expected)
  • N:新值(new)

比较并交换的过程如下:

  1. 判断 V 是否等于 E,如果等于,将 V 的值设置为 N;
  2. 如果不等,说明已经有其它线程更新了 V,则当前线程放弃更新,什么都不做。

这里的预期值 E 本质上指的是 “旧值”。

以一个简单的使用 CAS 的例子来解释这个过程:

1、如果有一个多个线程共享的变量 x 原本等于 10,现在在线程 A 中,想把它设置为新的值 11;

2、首先用 x 去与 10 对比,发现它等于 10,说明没有被其它线程改过,那就把它设置为新的值 11,此次 CAS 成功,x 的值被设置成了 11;

3、如果不等于 10,说明 x 被其它线程改过了(比如现在 x 的值为 11),那么就什么也不做,此次 CAS 失败,x 的值仍然为 10。

image.png

在这个例子中:

  • x 就是 V
  • 10 就是 E
  • 11 就是 N。

⚠ 那有没有可能在判断了 x 为 10 之后,正准备更新它的新值的时候,被其它线程更改了 i 的值呢?

不会的,因为 CAS 是一种原子操作,它是一种系统原语,是一条 CPU 的原子指令,从 CPU 层面保证它的原子性。当多个线程同时使用 CAS 操作一个变量时,只有一个会胜出,并成功更新,其余均会失败,但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。

CAS 是怎么工作的

如下,传统的采用 synchronized 方式的同步

public class Temp {
static class Storage {
int count;
public synchronized void add() {
count++;
}
}

public static void main(String[] args) throws InterruptedException {
long time = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(100);
Storage storage = new Storage();

// 创建 100 个线程
for (int i = 0; i < 100; i++) {
new Thread(()-> {
for (int j = 0; j < 10000; j++) {
storage.add();
}
countDownLatch.countDown();
}).start();
}

countDownLatch.await();
System.out.println("当前数量为:" + storage.count);
System.out.println("用时:" + (System.currentTimeMillis() - time));
}
}

改造成 CAS 的方式(注意,这个案例因为无法直接操作 CPU 所以还是用的 synchronized 来保证 CAS 的原子性)

public class Temp {
static class Storage {
int count;

public void add() {
// 让它一直判断,所以这里写个死循环(自旋),直到修改成功
while (!compareAndSwap(this.count, this.count + 1)) {
Thread.onSpinWait();
}
}

/**
* 如果期望值和内存中的值一致,就将新值赋值给内存中的变量,并且返回 true,否则返回 false
* <p>
* 注意:这个操作实际是 CPU 层面的,但是这里没法直接操作 CPU,所以直接用 synchronized 保证原子性
*
* @param expectNum 期望值
* @param newNum 新值
*/
public synchronized boolean compareAndSwap(int expectNum, int newNum) {
if (expectNum == this.count) {
this.count = newNum;
return true;
}
return false;
}
}

public static void main(String[] args) throws InterruptedException {
long time = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(100);
Storage storage = new Storage();

// 创建 10个线程
for (int i = 0; i < 100; i++) {
new Thread(() -> {
for (int j = 0; j < 10000; j++) {
storage.add();
}
countDownLatch.countDown();
}).start();
}

countDownLatch.await();
System.out.println("当前数量为:" + storage.count);
System.out.println("用时:" + (System.currentTimeMillis() - time));
}
}

AtomicInteger 实现 CAS

上面的那个例子只是为了演示 CAS 的工作,所以用的还是 synchronized,实际一般用的都是 AtomicInteger 工具类来实现 CAS,它里面调用的是 Unsafe 里面的方法,这些 Unsafe 方法是 CPU 层面保证原子性的(要求必须一个时间片内执行完)

注意:JUC 包里面还有其它的 Atomic 的工具类,例如 AtomicLong 之类的,总之它是用来保证原子性的工具类

public final native boolean compareAndSwapObject(Object o, long offset,  Object expected, Object update);

public final native boolean compareAndSwapInt(Object o, long offset, int expected,int update);

public final native boolean compareAndSwapLong(Object o, long offset, long expected, long update);

对上面的操作进行修改

public class Temp {
static class Storage {
// 设置初始值为0
AtomicInteger count = new AtomicInteger(0);

public void add() {
while (!count.compareAndSet(count.get(), count.get() + 1)) {
Thread.onSpinWait();
}
}
}

public static void main(String[] args) throws InterruptedException {
long time = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(100);
Storage storage = new Storage();

// 创建 10个线程
for (int i = 0; i < 100; i++) {
new Thread(() -> {
for (int j = 0; j < 10000; j++) {
storage.add();
}
countDownLatch.countDown();
}).start();
}

countDownLatch.await();
System.out.println("当前数量为:" + storage.count);
System.out.println("用时:" + (System.currentTimeMillis() - time));
}
}

不过都用上 AtomicInteger 了就没必要手动使用自旋了,这里直接用内置的方法

while (!count.compareAndSet(count.get(), count.get() + 1)) {
Thread.onSpinWait();
}

// 改成
count.getAndIncrement();

CAS 的适用场景

CAS 适用于写比较少的情况下(多读场景,冲突一般较少),synchronized 适用于写比较多的情况下(多写场景,冲突一般较多)。

对于资源竞争较少(线程冲突较轻)的情况,使用 synchronized 同步锁进行线程阻塞和唤醒切换以及用户态内核态间的切换操作额外浪费消耗 CPU 资源;而 CAS 基于硬件实现,不需要进入内核,不需要切换线程,操作自旋几率较少,因此可以获得更高的性能。

对于资源竞争严重(线程冲突严重)的情况,CAS 自旋的概率会比较大,从而浪费更多的 CPU 资源,效率低于 synchronized

补充: synchronized 在 JDK1.6 之后,已经改进优化。synchronized 的底层实现主要依靠 Lock-Free 的队列,基本思路是自旋后阻塞,竞争切换后继续竞争锁(就是四个状态),稍微牺牲了公平性,但获得了高吞吐量。在线程冲突较少的情况下,可以获得和 CAS 类似的性能;而线程冲突严重的情况下,性能远高于 CAS。

ABA 问题

参考资料 解决java多线程ABA问题的AtomicStampedReference

ABA 问题是 CAS 会出现的一种问题,在 CAS 操作时,其他线程将变量值 A 改为了 B,过会又将 B 改成 A(就是改了两次改回了 A),等到本线程使用期望值 A 与当前变量进行比较时,发现变量 A 没有变,于是 CAS 就将 A 值进行了交换操作,但是实际上该值已经被其他线程改变过,而一般情况应该是与期望值不符。

举个网上的例子:一个小偷,把别人家的钱偷了之后又还了回来,还是原来的钱吗,你老婆出轨之后又回来,还是原来的老婆嘛?ABA问题也一样,如果不好好解决就会带来大量的问题。最常见的就是资金问题,也就是别人如果挪用了你的钱,在你发现之前又还了回来。但是别人却已经触犯了法律。

ABA 问题的解决思路是 添加一个版本号(或者叫 Stamp 印记),每次变量更新的时候把变量的版本号加 1,那么 A-B-A 就会变成 A1-B2-A3,只要变量被某一线程修改过,改变量对应的版本号就会发生递增变化,从而解决了ABA问题。

java.util.concurrent.atomic 包中提供了 AtomicStampedReference 来解决 ABA 问题,该类的 compareAndSet 是该类的核心方法,实现如下:

public boolean compareAndSet(V   expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;

return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}

该类检查了当前引用与当前标志是否与预期相同,如果全部相等,才会以原子方式将该引用和该标志的值设为新的更新值,这样 CAS 操作中的比较就不依赖于变量的值了。

ABA 修改版本号的例子

具体的使用例子(就是需要修改版本号)

给出一个 ABA 的例子,对 ABA 问题进行场景重现。

public class Temp {

static void sleep(int time) {
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
AtomicInteger index = new AtomicInteger(10);

new Thread(() -> {
index.compareAndSet(10, 11);
index.compareAndSet(11, 10);
System.out.println(Thread.currentThread().getName() + ": 10->11->10");
}, "张三").start();


new Thread(() -> {
sleep(20);
boolean isSuccess = index.compareAndSet(10, 12);
System.out.println(Thread.currentThread().getName() + " 修改是否成功: " + isSuccess);
System.out.println("设置的新值是:" + index.get());
}, "李四").start();
}
}
张三: 10->11->10
李四 修改是否成功: true
设置的新值是:12

上面的李四发现 index 的值没有变化,于是还设置了新值 12

使用 AtomicStampedReference 解决这个问题的。

public class Temp {

static void sleep(int time) {
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
// 第二个参数是版本号
AtomicStampedReference<Integer> stampRef = new AtomicStampedReference<>(10, 1);


new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 第1次版本号: " + stampRef.getStamp());

stampRef.compareAndSet(10, 11, stampRef.getStamp(), stampRef.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + " 第2次版本号: " + stampRef.getStamp());

stampRef.compareAndSet(11, 10, stampRef.getStamp(), stampRef.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + " 第3次版本号: " + stampRef.getStamp());
}, "张三").start();


new Thread(() -> {
// 先取得版本号
int stamp = stampRef.getStamp();
System.out.println(Thread.currentThread().getName() + " 第1次版本号: " + stamp);
sleep(20);

boolean isSuccess = stampRef.compareAndSet(10, 12, stamp, stampRef.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + " 修改是否成功: " + isSuccess + " 当前版本 :" + stampRef.getStamp());
System.out.println(Thread.currentThread().getName() + " 当前实际值: " + stampRef.getReference());

}, "李四").start();
}
}
李四 第1次版本号: 1
张三 第1次版本号: 1
张三 第2次版本号: 2
张三 第3次版本号: 3
李四 修改是否成功: false 当前版本 :3
李四 当前实际值: 10